"use strict";
/*
 * Copyright (c) 2022 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// ���ݵ�������
// _read����push����
// ��̳�
// object
Object.defineProperty(exports, "__esModule", { value: true });
// ���ݵ����ѣ�����ģʽ ��ͣģʽ
// utils state
// NODE_DEBUG=stream node streamTest.js
const stream_1 = require("./stream");
//ArrayBuffer
const kAutoDestory = 1 << 2;
const kEmitClose = 1 << 3;
const kConstructed = 1;
const kDestroyed = 1 << 4;
const kErrored = 1 << 7;
// origin
//const bitfield = Symbol.for('bitfieldSymbol');
const kObjectMode = 1 << 0;
const kSync = 1 << 12;
const kEnded = 1 << 9; //������
const kEndEmitted = 1 << 10; // stream end ������� close error
const kReading = 1 << 11;
const kNeedReadable = 1 << 13;
const kEmittedReadable = 1 << 14;
const kReadableListening = 1 << 15;
const kResumeScheduled = 1 << 16;
const kReadingMore = 1 << 18;
const kDataEmitted = 1 << 19;
const kHasFlowing = 1 << 22;
const kFlowing = 1 << 24;
const kHasPaused = 1 << 25;
const kPaused = 1 << 26;
const kDataListening = 1 << 27; //data�¼�
let defaultHighWaterMarkBytes = 16 * 1024;
let defaultHighWaterMarkObjectMode = 16;
function runNextTick(fn, ...args) {
    Promise.resolve(args).then((args) => fn(...args));
}
function debug(name, ...args) {
    var info = name;
    let count = args.length;
    for (let i = 0; i < count; i++) {
        info += ' ';
        info += args[i];
    }
    console.log("stream " + info);
}
class Buffer {
}
class Options {
    constructor() {
        this.objectMode = false;
        this.objectReableObjectMode = false;
    }
}
//const readable; ��ص�״̬
//
//const data ��ص�״̬
//���������ͣģʽ��Ȼ����������ģʽ��buffer��ô��ȡ
//ʲô��������»����nextTick, ��ص�function��emitReadable, maybeReadMore,endFn,ReadingNextTick,updateReadableListening
//,_resume, endReadable, _endWriteable
//watermark�ж�
class ReadableState {
    constructor(options, stream, isDuplex) {
        this.state = 0;
        this.buffer = [];
        this.length = 0;
        this.pipes = [];
        this.bufferIndex = 0;
        this.bitfield = 0;
        //warning: highwater��������
        this.highWaterMark = defaultHighWaterMarkBytes;
        this.bitfield = kEmitClose | kAutoDestory | kConstructed | kSync;
        if (options && options.objectMode) {
            this.bitfield |= kObjectMode;
        }
        if (isDuplex && options && options.objectReableObjectMode) {
            this.bitfield |= kObjectMode;
        }
        this.highWaterMark = options.objectMode ?
            defaultHighWaterMarkObjectMode :
            defaultHighWaterMarkBytes;
    }
}
class Readable extends stream_1.Stream {
    constructor(options) {
        super();
        this._readableState = new ReadableState(options, this, false);
    }
    _read(n) {
        console.log("don't directly call _read");
    }
    push(chunk) {
        debug('push', chunk);
        const state = this._readableState;
        return (state.bitfield & kObjectMode) === 0 ?
            readableAddchunkPushByteMode(this, state, chunk) :
            readableAddChunkPushObjectMode(this, state, chunk);
    }
    // emit(name: string, data: chunk = null) {
    //     let exsiting = this._events.get(name);
    //     if (typeof exsiting == "function") {
    //         exsiting(data);
    //     }
    // }
    read(n) {
        debug('read', n);
        if (n === undefined) {
            n = NaN;
        }
        else if (!Number.isInteger(n)) {
            n = Number.parseInt(n.toString(), 10);
        }
        const state = this._readableState;
        const oriN = n;
        if (n > state.highWaterMark) {
            state.highWaterMark = computNewHighWaterMark(n);
        }
        if (n !== 0) {
            state.bitfield &= ~kEmittedReadable;
        }
        if (n === 0 &&
            (state.bitfield & kNeedReadable) !== 0 &&
            ((state.highWaterMark !== 0 ?
                state.length >= state.highWaterMark :
                state.length > 0) ||
                (state.bitfield & kEnded) !== 0)) {
            debug('read: emitReadable');
            if (state.length === 0 && (state.bitfield & kEnded) !== 0) {
                endReadable(this);
            }
            else {
                emitReadable(this);
            }
            return null;
        }
        n = howMuchToRead(n, state);
        if (n === 0 && (state.bitfield & kEnded) !== 0) {
            if (state.length === 0) {
                endReadable(this);
            }
            return null;
        }
        //doRead
        let doRead = (state.bitfield & kNeedReadable) !== 0;
        debug('need readable', doRead);
        if (state.length === 0 || state.length - n < state.highWaterMark) {
            doRead = true;
            debug('length less than watermart', doRead);
        }
        if ((state.bitfield & (kReading | kEnded | kDestroyed | kErrored | kConstructed)) !== kConstructed) {
            doRead = false;
            debug('reading, ended or constructing', doRead);
        }
        else if (doRead) {
            state.bitfield |= kReading | kSync;
            if (state.length === 0) {
                state.bitfield |= kNeedReadable;
            }
            try {
                this._read(state.highWaterMark);
            }
            catch (err) {
            }
            state.bitfield &= ~kSync;
            if ((state.bitfield & kReading) === 0) {
                n = howMuchToRead(oriN, state);
            }
        }
        let ret;
        if (n > 0) {
            ret = this.fromList(n, state);
        }
        else {
            ret = null;
        }
        if (ret === null) {
            state.bitfield |= state.length <= state.highWaterMark ? kNeedReadable : 0;
            n = 0;
        }
        else {
            state.length -= n;
            //kMultiAwaitDrain
        }
        if (state.length === 0) {
            if ((state.bitfield & kEnded) === 0) {
                state.bitfield |= kNeedReadable;
            }
            if (oriN !== n && (state.bitfield & kEnded) !== 0) {
                endReadable(this);
            }
        }
        if (ret !== null && (state.bitfield & (stream_1.kErrorEmitted | stream_1.kCloseEmitted)) === 0) {
            state.bitfield |= kDataEmitted;
            this.emit('data', ret);
        }
        return ret;
    }
    resume() {
        const state = this._readableState;
        if ((state.bitfield & kFlowing) === 0) {
            state.bitfield |= kHasFlowing;
            //ֻ��û��readable�¼�������ʱ�����������������Ȼ�����reume
            if ((state.bitfield & kReadableListening) === 0) {
                state.bitfield |= kFlowing;
            }
            else {
                state.bitfield &= ~kFlowing;
            }
            resume_(this, state);
        }
        state.bitfield |= kHasPaused;
        state.bitfield &= ~kHasPaused;
        return this;
    }
    on(ev, fn) {
        super.on(ev, fn);
        const state = this._readableState;
        if (ev === 'data') {
            state.bitfield |= kDataListening;
            state.bitfield |= this.listenerCount("readable") > 0 ? kReadableListening : 0;
            if ((state.bitfield & (kHasFlowing | kFlowing)) !== kHasFlowing) {
                this.resume();
            }
        }
        else if (ev === 'readable') {
            if ((state.bitfield & (kEndEmitted | kReadableListening)) === 0) {
                state.bitfield |= kReadableListening | kNeedReadable | kHasFlowing;
                state.bitfield &= ~(kFlowing | kEmittedReadable);
                if (state.length) {
                    emitReadable(this);
                }
                else if ((state.bitfield & kReading) === 0) {
                }
            }
        }
    }
    fromList(n, state) {
        if (state.length === 0) {
            return null;
        }
        let index = state.bufferIndex;
        let ret;
        const buffer = state.buffer;
        const length = buffer.length;
        if ((state.bitfield & kObjectMode) !== 0) {
            ret = buffer[index];
            buffer[index++] = null;
        }
        if (index === length) {
            state.buffer.length = 0;
            state.bufferIndex = 0;
        }
        else if (index > 1024) {
            state.buffer.splice(0, index);
            state.bufferIndex = 0;
        }
        else {
            state.bufferIndex = index;
        }
        return ret;
    }
}
const MAX_HWM = 0X40000000;
function computNewHighWaterMark(n) {
    if (n > MAX_HWM) {
        throw new Error("out of watermark range");
    }
    else {
        n--;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        n++;
    }
    return n;
}
function resume(stream, state) {
    if ((state.bitfield & kResumeScheduled) === 0) {
        state.bitfield |= kResumeScheduled;
        runNextTick(resume_, stream, state);
    }
}
function resume_(stream, state) {
    if ((state.bitfield & kReading) === 0) {
        stream.read(0);
    }
    state.bitfield &= ~kResumeScheduled;
    stream.emit('resume');
    flow(stream);
    if ((state.bitfield & (kFlowing | kReading)) === kFlowing) {
        stream.read(0);
    }
}
function howMuchToRead(n, state) {
    if (n <= 0 || (state.length === 0 && (state.bitfield & kEnded) !== 0)) {
        return 0;
    }
    if ((state.bitfield & kObjectMode) !== 0) {
        return 1;
    }
    if (Number.isNaN(n)) {
        if ((state.bitfield & kFlowing) !== 0 && state.length) {
            return state.buffer[state.bufferIndex].length;
        }
        return state.length;
    }
    if (n <= state.length) {
        return n;
    }
    return (state.bitfield & kEnded) !== 0 ? state.length : 0;
}
function emitReadable(stream) {
    const state = stream._readableState;
    debug('emitReadable');
    state.bitfield &= ~kNeedReadable;
    if ((state.bitfield & kEmittedReadable) === 0) {
        debug('emitReadable', (state.bitfield & kFlowing) !== 0);
        state.bitfield |= kEmittedReadable;
        runNextTick(emitReadable_, stream);
    }
}
function emitReadable_(stream) {
    const state = stream._readableState;
    debug('emitReadable_');
    if ((state.bitfield & (kDestroyed | kErrored)) === 0 && (state.length || (state.bitfield & kEnded) !== 0)) {
        stream.emit("readable");
        state.bitfield &= ~kEmittedReadable;
    }
    state.bitfield |=
        (state.bitfield & (kFlowing | kEnded)) === 0 &&
            state.length <= state.highWaterMark ? kNeedReadable : 0;
    flow(stream);
}
function flow(stream) {
    const state = stream._readableState;
    debug('flow');
    while ((state.bitfield & kFlowing) !== 0 && stream.read() !== null)
        ;
}
function readableAddChunkPushObjectMode(stream, state, chunk) {
    if (chunk === null) {
        state.bitfield &= ~kReading;
        onEofChunk(stream, state);
        return false;
    }
    if ((state.bitfield & kEnded) !== 0) {
        throw Error("stream.push after EOF");
        return false;
    }
    if ((state.bitfield & (kDestroyed | kErrored)) !== 0) {
        return false;
    }
    state.bitfield &= ~kReading;
    //kDecoder
    addChunk(stream, state, chunk, false);
    return canPushMore(state);
}
function readableAddchunkPushByteMode(stream, state, chunk) {
    if (chunk === null) {
        state.bitfield &= ~kReading;
        return false;
    }
    if (!chunk || chunk.length <= 0) {
        state.bitfield &= ~kReading;
    }
    addChunk(stream, state, chunk, false);
}
function canPushMore(state) {
    return (state.bitfield & kEnded) === 0 &&
        (state.length < state.highWaterMark || state.length === 0);
}
function onEofChunk(stream, state) {
    debug('onEofChunk');
    if ((state.bitfield & kEnded) !== 0)
        return;
    state.bitfield |= kEnded;
    if ((state.bitfield & kSync) !== 0) {
        emitReadable(stream);
    }
    else {
        state.bitfield &= ~kNeedReadable;
        state.bitfield |= kEmittedReadable;
        emitReadable_(stream);
    }
}
function addChunk(stream, state, chunk, addToFront) {
    //kSync  ��ϣ��_read������ȥ����read�� ��д�����
    if ((state.bitfield & (kFlowing | kSync | kDataListening)) === (kFlowing | kDataListening) && state.length === 0) {
        state.bitfield != kDataEmitted;
        stream.emit('data', chunk);
    }
    else {
        state.length += (state.bitfield & kObjectMode) !== 0 ? 1 : chunk.length;
        if (addToFront) {
            //todo
        }
        else {
            state.buffer.push(chunk);
        }
    }
    if ((state.bitfield & kNeedReadable) !== 0) {
        emitReadable(stream);
    }
    // maybereadmore
    maybeReadMore(stream, state);
}
function maybeReadMore(stream, state) {
    if ((state.bitfield & (kReadingMore | kConstructed)) === kConstructed) {
        state.bitfield |= kReadingMore;
        runNextTick(maybeReadMore_, stream, state);
    }
}
function maybeReadMore_(steam, state) {
    while ((state.bitfield & (kReading | kEnded)) === 0 &&
        (state.length < state.highWaterMark ||
            ((state.bitfield & kFlowing) !== 0 && state.length === 0))) {
        const len = state.length;
        steam.read(0);
        if (len === state.length) {
            break;
        }
        state.bitfield &= ~kReadingMore;
    }
}
function endReadable(stream) {
    console.log("endReadable");
    const state = stream._readableState;
    if ((state.bitfield & kEndEmitted) === 0) {
        state.bitfield |= kEnded;
        runNextTick(endReadableNT, stream, state);
    }
}
function endReadableNT(stream, state) {
    console.log("endReadableNT state.length:" + state.length);
    if ((state.bitfield & (kErrored | stream_1.kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) {
        state.bitfield |= kEndEmitted;
        stream.emit("end");
    }
}
/*test*/
class TestReadable extends Readable {
    _read(chunk) {
        console.log("chenlin _read");
        if (chunk === 0) {
            this.push(null);
        }
        else {
            this.push(chunk);
        }
    }
}
let options = new Options();
options.objectMode = true;
let readable = new TestReadable(options);
readable.on("end", () => {
    console.log("chenlin end");
});
readable.on("close", () => {
    console.log("chenlin close");
});
//console.log(readable);
readable._read(1);
readable._read(2);
readable._read(0);
//readable._read(1);
console.log(readable.read());
console.log(readable.read());
